Skip to content

Automated Test: span-flusher-multiprocess #324

Closed

Conversation

admin-coderabbit
Copy link
Owner

@admin-coderabbit admin-coderabbit commented Feb 4, 2026

This pull request was automatically created by @coderabbitai/e2e-reviewer.

Batch created pull request.

Summary by CodeRabbit

  • New Features

    • Added --flusher-processes CLI option to control the number of span processing workers.
  • Improvements

    • Enhanced span flusher with advanced multiprocessing architecture for improved throughput and performance.
  • Documentation

    • Added illustrative examples demonstrating correct vs. incorrect type-checking approaches in anti-patterns guide.

@coderabbit-eval
Copy link

coderabbit-eval bot commented Feb 4, 2026

📝 Walkthrough

Walkthrough

Changes introduce configurable multiprocessing support for span flushing with per-shard distribution. A new flusher_processes CLI option is added to the Kafka consumer configuration, threaded through the factory layer, and implemented in the flusher with per-process state tracking and health monitoring.

Changes

Cohort / File(s) Summary
Documentation
CLAUDE.md
Added documentation examples contrasting incorrect hasattr() vs. correct isinstance() approaches for type checking in union scenarios.
CLI Configuration
src/sentry/consumers/__init__.py
Added --flusher-processes CLI option to the process-spans Kafka consumer configuration with default value of 1.
Factory Layer
src/sentry/spans/consumers/process/factory.py
Added optional flusher_processes parameter to ProcessSpansStrategyFactory.__init__ and propagated it to SpanFlusher instantiation via max_processes argument.
Flusher Implementation
src/sentry/spans/consumers/process/flusher.py
Refactored SpanFlusher to support per-shard multiprocessing with shard-to-process mapping, per-process state tracking (healthy/backpressure), per-shard buffers, and dynamic process creation/restart logic. Updated __init__ to accept max_processes and main to accept shards list for shard-scoped instrumentation.
Consumer Tests
tests/sentry/spans/consumers/process/test_consumer.py
Modified test_basic with additional polling and sleep step; added new test_flusher_processes_limit test to verify process distribution across shards with configurable flusher process count.
Flusher Tests
tests/sentry/spans/consumers/process/test_flusher.py
Updated backpressure assertion to check collection of per-process backpressure states instead of single flag.

Sequence Diagram

sequenceDiagram
    participant CLI as CLI Configuration
    participant Factory as ProcessSpansStrategyFactory
    participant Flusher as SpanFlusher
    participant Proc1 as Flusher Process 1
    participant Proc2 as Flusher Process 2
    participant Shards as Shard Buffer

    CLI->>Factory: Create with flusher_processes=2
    Factory->>Flusher: Initialize(max_processes=2, shards=[0,1,2,3])
    Flusher->>Flusher: Map shards to processes (shard_to_process)
    Flusher->>Proc1: Create process for shards [0,1]
    Flusher->>Proc2: Create process for shards [2,3]
    
    Note over Proc1,Proc2: Per-process monitoring loop
    Flusher->>Proc1: Monitor health/backpressure
    Flusher->>Proc2: Monitor health/backpressure
    
    Proc1->>Shards: Poll shard 0 buffer
    Proc1->>Shards: Poll shard 1 buffer
    Proc2->>Shards: Poll shard 2 buffer
    Proc2->>Shards: Poll shard 3 buffer
    
    alt Process unhealthy
        Flusher->>Proc1: Restart process
    else Backpressure detected
        Flusher->>Flusher: Update process_backpressure_since
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 Whiskers twitch with delight,
Spans now split across processes bright,
Shards dancing through the flusher's care,
Multiprocessing magic in the air!

🚥 Pre-merge checks | ✅ 1 | ❌ 1
❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Automated Test: span-flusher-multiprocess' is vague and generic, using the term 'Automated Test' which doesn't convey specific information about the actual changes made. Replace the generic 'Automated Test:' prefix with a descriptive summary of the main change, such as 'Add multiprocessing support to span flusher with configurable process limits' or similar.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch span-flusher-multiprocess

Comment @coderabbit eval help to get the list of available commands and usage tips.

@coderabbit-eval coderabbit-eval bot closed this Feb 4, 2026
Copy link

@coderabbit-eval coderabbit-eval bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/sentry/spans/consumers/process/flusher.py`:
- Around line 185-199: The metric tag key is inconsistent: earlier calls use
tags={"shard": shard_tag} while the wait_produce timer uses tags={"shards":
shard_tag}; update the metrics.timer call for
"spans.buffer.flusher.wait_produce" to use the same tag key ("shard") as the
other metrics (i.e., change the tags dict for the wait_produce metrics.timer to
{"shard": shard_tag}) so all metrics (see the metrics.timer and metrics.timing
calls around flushed_segments, KafkaPayload, and produce) use a consistent tag
name.
- Around line 127-132: Remove the dead helper method _create_process_for_shard:
it's never called (the restart logic in _ensure_processes_alive already uses
_create_process_for_shards), so delete the entire _create_process_for_shard
definition and any related unused imports or references; keep
_create_process_for_shards and process_to_shards_map intact to preserve the
actual restart flow.
🧹 Nitpick comments (2)
src/sentry/spans/consumers/process/flusher.py (2)

247-251: Consider extracting the error message to a constant or using a custom exception.

Per static analysis (TRY003), long messages in exceptions are better handled via custom exception classes for consistency and maintainability.

Proposed refactor
+class FlusherProcessCrashedError(RuntimeError):
+    def __init__(self, shards: list[int], cause: str):
+        super().__init__(f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer")

 # In _ensure_processes_alive:
-            if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
-                raise RuntimeError(
-                    f"flusher process for shards {shards} crashed repeatedly ({cause}), restarting consumer"
-                )
+            if self.process_restarts[process_index] > MAX_PROCESS_RESTARTS:
+                raise FlusherProcessCrashedError(shards, cause)

337-347: Rename unused loop variable per static analysis.

The process_index variable is not used within the loop body (B007).

Proposed fix
-        for process_index, process in self.processes.items():
+        for _process_index, process in self.processes.items():

Comment on lines +127 to +132
def _create_process_for_shard(self, shard: int):
# Find which process this shard belongs to and restart that process
for process_index, shards in self.process_to_shards_map.items():
if shard in shards:
self._create_process_for_shards(process_index, shards)
break

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for any usages of _create_process_for_shard in the codebase
rg -n '_create_process_for_shard' --type=py

Repository: admin-coderabbit/coderabbit-sentry-mirror

Length of output: 671


Unused method _create_process_for_shard.

This method is defined at lines 127-132 but never called anywhere in the codebase. The process restart logic in _ensure_processes_alive (line 259) directly calls the plural version _create_process_for_shards instead. Remove this unused method.

🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` around lines 127 - 132, Remove
the dead helper method _create_process_for_shard: it's never called (the restart
logic in _ensure_processes_alive already uses _create_process_for_shards), so
delete the entire _create_process_for_shard definition and any related unused
imports or references; keep _create_process_for_shards and process_to_shards_map
intact to preserve the actual restart flow.

Comment on lines +185 to +199
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
if not flushed_segment.spans:
continue

spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)

with metrics.timer("spans.buffer.flusher.wait_produce"):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent metric tag naming: shard vs shards.

The metric tags use inconsistent naming - "shard" on lines 185 and 195-196, but "shards" on line 199. This inconsistency could make metrics aggregation and querying more difficult.

Proposed fix for consistent tag naming
-                with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
+                with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
if not flushed_segment.spans:
continue
spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing("spans.buffer.segment_size_bytes", len(kafka_payload.value))
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)
with metrics.timer("spans.buffer.flusher.wait_produce"):
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shards": shard_tag}):
with metrics.timer("spans.buffer.flusher.produce", tags={"shard": shard_tag}):
for flushed_segment in flushed_segments.values():
if not flushed_segment.spans:
continue
spans = [span.payload for span in flushed_segment.spans]
kafka_payload = KafkaPayload(None, orjson.dumps({"spans": spans}), [])
metrics.timing(
"spans.buffer.segment_size_bytes",
len(kafka_payload.value),
tags={"shard": shard_tag},
)
produce(kafka_payload)
with metrics.timer("spans.buffer.flusher.wait_produce", tags={"shard": shard_tag}):
🤖 Prompt for AI Agents
In `@src/sentry/spans/consumers/process/flusher.py` around lines 185 - 199, The
metric tag key is inconsistent: earlier calls use tags={"shard": shard_tag}
while the wait_produce timer uses tags={"shards": shard_tag}; update the
metrics.timer call for "spans.buffer.flusher.wait_produce" to use the same tag
key ("shard") as the other metrics (i.e., change the tags dict for the
wait_produce metrics.timer to {"shard": shard_tag}) so all metrics (see the
metrics.timer and metrics.timing calls around flushed_segments, KafkaPayload,
and produce) use a consistent tag name.

Sign in to join this conversation on GitHub.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant